11 ROS2 实践课-服务通信节点开发与调试
ROS2 服务通信节点开发与调试
关联:索引
- 场景提问:你要下发“急停”指令,最关键的响应信息应该是什么?
1. srv 文件的基本结构
# Request
<字段类型> <字段名>
...
---
# Response
<字段类型> <字段名>
...
- 业务可解释:字段名能对应业务含义(谁、要干什么、参数是什么、要不要强制)。
- 可追溯:建议包含
request_id便于复盘(至少要能在日志里定位一次请求);时间戳可用float64 timestamp(秒)或builtin_interfaces/Time stamp(需在接口包声明依赖)。 - 幂等思维:重复的“触发急停/解除急停”不应造成异常(可用
error_code=3表示重复指令)。
服务名建议:/sorting/device/emergency_stop
# 请求字段(客户端→服务端)
string request_id
float64 timestamp
bool estop_trigger
---
# 响应字段(服务端→客户端)
bool ok
uint8 error_code
string request_id
float64 execution_time
string device_final_status
float64 response_timestamp
1. 创建工作空间与接口包(推荐接口独立成包)
source /opt/ros/humble/setup.bash
mkdir -p ~/ros2_ws/src
cd ~/ros2_ws/src
ros2 pkg create sorting_interfaces --build-type ament_cmake
说明:接口包选用 ament_cmake 是 ROS2 里生成 .srv/.msg/.action 的常见做法,不代表必须写 C++;服务端/客户端节点依然可以用 ament_python 开发。
建议目录:
-
~/ros2_ws/src/sorting_interfaces/srv/DeviceEmergencyStop.srv -
package.xml:声明接口生成依赖、运行依赖,并把该包标记为接口包。 -
CMakeLists.txt:调用rosidl_generate_interfaces(...)生成 srv 类型,并导出运行时依赖。
package.xml(接口包最小可用模板,可直接替换包名/描述/维护者信息):
<?xml version="1.0"?>
<package format="3">
<name>sorting_interfaces</name>
<version>0.0.0</version>
<description>Interface definitions for sorting robot</description>
<maintainer email="[email protected]">teacher</maintainer>
<license>Apache-2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>
如果 .srv 里使用了 builtin_interfaces/Time,在 package.xml 里额外加一行依赖:
<depend>builtin_interfaces</depend>
CMakeLists.txt(接口包最小可用模板,把 srv 文件名填进去):
cmake_minimum_required(VERSION 3.8)
project(sorting_interfaces)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
set(srv_files
"srv/DeviceEmergencyStop.srv"
)
rosidl_generate_interfaces(${PROJECT_NAME}
${srv_files}
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()
如果 .srv 里使用了 builtin_interfaces/Time,在 CMakeLists.txt 里补两处:
find_package(builtin_interfaces REQUIRED)
以及在 rosidl_generate_interfaces(...) 中增加:
DEPENDENCIES builtin_interfaces
3. 编译与环境刷新
cd ~/ros2_ws
colcon build --symlink-install
source /opt/ros/humble/setup.bash
source install/setup.bash
4. 生成结果验证(必须能用命令看到接口)
查看接口是否已注册:
ros2 interface list | grep sorting_interfaces
查看 srv 具体字段:
ros2 interface show sorting_interfaces/srv/DeviceEmergencyStop
5. 小组产出(当堂提交)
- 一张“srv 接口设计卡”(服务名 + Request/Response 字段 + 每个字段的业务含义)
- 快问快答:为什么自定义 srv 不编译就不能在节点里 import 使用?
1. 创建服务功能包(Python 示例)
source /opt/ros/humble/setup.bash
cd ~/ros2_ws/src
ros2 pkg create sorting_service_practice --build-type ament_python --dependencies rclpy sorting_interfaces
建议文件路径:
~/ros2_ws/src/sorting_service_practice/sorting_service_practice/emergency_stop_server.py~/ros2_ws/src/sorting_service_practice/sorting_service_practice/emergency_stop_client.py
入口点配置(setup.py)示例结构:
entry_points={
'console_scripts': [
'emergency_stop_server = sorting_service_practice.emergency_stop_server:main',
'emergency_stop_client = sorting_service_practice.emergency_stop_client:main',
],
},
实现代码(可直接复制):
服务端:emergency_stop_server.py
import time
import rclpy
from rclpy.node import Node
from sorting_interfaces.srv import DeviceEmergencyStop
class EmergencyStopServer(Node):
def __init__(self) -> None:
super().__init__('emergency_stop_server')
self._device_status = 'RUNNING'
self._service = self.create_service(
DeviceEmergencyStop,
'/sorting/device/emergency_stop',
self._handle_emergency_stop,
)
def _handle_emergency_stop(
self,
request: DeviceEmergencyStop.Request,
response: DeviceEmergencyStop.Response,
) -> DeviceEmergencyStop.Response:
start_perf = time.perf_counter()
response.request_id = request.request_id
if not request.request_id or request.timestamp <= 0.0:
response.ok = False
response.error_code = 1
response.device_final_status = self._device_status
response.execution_time = time.perf_counter() - start_perf
response.response_timestamp = time.time()
return response
if request.estop_trigger and self._device_status == 'STOPPED':
response.ok = False
response.error_code = 3
response.device_final_status = self._device_status
response.execution_time = time.perf_counter() - start_perf
response.response_timestamp = time.time()
return response
if (not request.estop_trigger) and self._device_status == 'RUNNING':
response.ok = False
response.error_code = 3
response.device_final_status = self._device_status
response.execution_time = time.perf_counter() - start_perf
response.response_timestamp = time.time()
return response
self._device_status = 'STOPPED' if request.estop_trigger else 'RUNNING'
response.ok = True
response.error_code = 0
response.device_final_status = self._device_status
response.execution_time = time.perf_counter() - start_perf
response.response_timestamp = time.time()
return response
def main(args=None) -> None:
rclpy.init(args=args)
node = EmergencyStopServer()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
客户端:emergency_stop_client.py
import time
import rclpy
from rclpy.node import Node
from sorting_interfaces.srv import DeviceEmergencyStop
def main(args=None) -> None:
rclpy.init(args=args)
node = Node('emergency_stop_client')
client = node.create_client(DeviceEmergencyStop, '/sorting/device/emergency_stop')
if not client.wait_for_service(timeout_sec=3.0):
node.get_logger().error('service not available: /sorting/device/emergency_stop')
node.destroy_node()
rclpy.shutdown()
return
req = DeviceEmergencyStop.Request()
req.request_id = 'r001'
req.timestamp = time.time()
req.estop_trigger = True
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=3.0)
if not future.done():
node.get_logger().error('call timeout')
node.destroy_node()
rclpy.shutdown()
return
if future.exception() is not None:
node.get_logger().error(f'call failed: {future.exception()}')
node.destroy_node()
rclpy.shutdown()
return
resp: DeviceEmergencyStop.Response = future.result()
node.get_logger().info(
f'ok={resp.ok}, error_code={resp.error_code}, request_id={resp.request_id}, '
f'exec_time={resp.execution_time:.4f}, final_status={resp.device_final_status}, '
f'resp_ts={resp.response_timestamp:.3f}'
)
node.destroy_node()
rclpy.shutdown()
-
服务端:创建 service、校验请求、执行业务、填充响应(ok/error_code/request_id/execution_time/device_final_status/response_timestamp)。
-
超时:必须设置(调用失败要能区分:服务不存在/超时/异常)。
-
任务建议:新增一个 srv(如
DeviceSetParam.srv),请求包含device_id/param_name/value/request_id/timestamp,响应包含ok/error_code/request_id/execution_time/device_final_status/response_timestamp。 -
提交物:命令清单 + 关键输出截图/粘贴 + 你们组的字段解释卡。
1. 编译与运行(先服务端后客户端)
cd ~/ros2_ws
colcon build --symlink-install
source /opt/ros/humble/setup.bash
source install/setup.bash
终端 A(启动服务端):
ros2 run sorting_service_practice emergency_stop_server
终端 B(查看服务与类型):
ros2 service list | grep emergency
ros2 service type /sorting/device/emergency_stop
ros2 interface show sorting_interfaces/srv/DeviceEmergencyStop
ros2 service call /sorting/device/emergency_stop sorting_interfaces/srv/DeviceEmergencyStop "{request_id: 'r001', timestamp: 1710000000.0, estop_trigger: true}"
若终端对引号较敏感,可改用单引号包裹 YAML:ros2 service call ... '{request_id: "r001", timestamp: 1710000000.0, estop_trigger: true}'。
失败调用示例(重复指令,期望 ok=false 且 error_code=3):
ros2 service call /sorting/device/emergency_stop sorting_interfaces/srv/DeviceEmergencyStop "{request_id: 'r002', timestamp: 1710000001.0, estop_trigger: true}"
失败调用示例(非法请求,期望 ok=false 且 error_code=1):
ros2 service call /sorting/device/emergency_stop sorting_interfaces/srv/DeviceEmergencyStop "{request_id: '', timestamp: 0.0, estop_trigger: true}"
- 响应里
ok是否明确? error_code是否可复盘(0=成功,1=非法指令,2=设备离线,3=重复指令)?request_id是否原样返回(便于追溯)?device_final_status是否反映“急停后状态”?
1. 接口定义/生成相关(编译时报错)
- 现象:
ros2 interface show找不到自定义 srv - 排查顺序:
1)确认~/ros2_ws/src/sorting_interfaces/srv/*.srv文件名与内容格式正确(必须有---)
2)确认colcon build无报错
3)确认执行过source install/setup.bash(每个新终端都要)
2. 找不到服务/调用超时(运行时最常见)
- 现象:客户端提示
service not available或 ros2 call 卡住 - 排查顺序:
1)ros2 node list看服务端节点是否存在
2)ros2 service list看服务名是否一致(命名空间、前导/)
3)ros2 service type <服务名>看类型是否匹配(srv 名称是否对)
4)确认两端ROS_DOMAIN_ID一致,且都 source 了同一个工作空间
3. 环境/网络隔离问题(“我明明跑了但看不见”)
- 现象:同一条命令在不同终端结果不同
- 排查顺序:
1)每个终端重新source /opt/ros/humble/setup.bash与source ~/ros2_ws/install/setup.bash
2)检查echo $ROS_DOMAIN_ID是否一致
3)确认没有在另一个工作空间/旧 install 下运行
4)用ros2 service list -t(或ros2 service list --show-types)同时查看服务名与类型,确认类型与 srv 一致
4. 权限/系统策略相关(多机或受限环境常见)
-
现象:跨机器能 ping,但
ros2 service list互相看不见;或只在本机可见 -
排查顺序:
1)确认两机在同一网段/路由可达,且ROS_DOMAIN_ID一致
2)临时关闭防火墙或放通 DDS/ROS2 相关端口(不同 DDS 实现端口策略不同)
3)确认没有混用 VPN/多网卡导致发现报文走错接口 -
AI 可以用来生成 srv 模板、节点骨架、分析报错,但必须提交:
-
你向 AI 提问的关键提示词
-
AI 的关键输出(保留原文)
-
你的人工审计与修改点(至少 3 条)
-
你用 ros2 CLI 给出的验证证据(命令 + 关键输出)
提示词模板(可直接复制):
你是 ROS2 Humble 的服务通信开发与调试专家。请基于以下需求输出可运行的实现与自测方法。
需求:分拣设备“急停控制”服务
- 接口规范:请先用 5 条要点总结 srv 接口设计规范(字段、错误码、幂等、可追溯、验收证据)
- 自定义 srv:请给出 .srv 内容(必须包含 request_id/timestamp/estop_trigger 与 execution_time/device_final_status/response_timestamp)
- Python rclpy:请给出服务端与客户端完整代码(关键逻辑处添加简短注释:字段校验、超时、错误码含义、execution_time 计算)
- 约束:客户端必须带超时;服务端必须对非法请求返回 ok=false 与 error_code;服务端必须原样返回 request_id
- 自测:给出 ros2 interface show、ros2 service list/type/call 的命令与期望关键输出(至少包含 1 次成功调用 + 1 次重复指令失败 + 1 次非法请求失败)
- 排错:如果出现“service not available / 找不到接口 / colcon 编译失败”,请按“现象→原因→定位命令→修复步骤→复验命令”输出
注意:不要假设任何文件已经存在;请同时给出推荐的包划分与文件路径(interfaces 包与节点包);涉及依赖必须在 package.xml / CMakeLists.txt 或 setup.py 中体现。